This notebook describes how to implement distributed tensorflow code.

Content of this notebook is shown below.

  1. Prepare CIFAR-10 Dataset (TFRecords Format)
  2. Define parameters
  3. Define data input pipeline
  4. Define features
  5. Define a model
  6. Define serving function
  7. Train, evaluate and export a model
  8. Evaluate with Estimator
  9. Prediction with Exported Model
  10. Distributed Training with Cloud ML Engine

1. Prepare CIFAR-10 Dataset (TFRecords Format)


In [ ]:
import cPickle
import os
import re
import shutil
import tarfile
import tensorflow as tf

print(tf.__version__)

In [ ]:
CIFAR_FILENAME = 'cifar-10-python.tar.gz'
CIFAR_DOWNLOAD_URL = 'http://www.cs.toronto.edu/~kriz/' + CIFAR_FILENAME
CIFAR_LOCAL_FOLDER = 'cifar-10-batches-py'

In [ ]:
def _download_and_extract(data_dir):
  tf.contrib.learn.datasets.base.maybe_download(CIFAR_FILENAME, data_dir, CIFAR_DOWNLOAD_URL)
  tarfile.open(os.path.join(data_dir, CIFAR_FILENAME), 'r:gz').extractall(data_dir)

In [ ]:
def _get_file_names():
  """Returns the file names expected to exist in the input_dir."""
  file_names = {}
  file_names['train'] = ['data_batch_%d' % i for i in xrange(1, 5)]
  file_names['validation'] = ['data_batch_5']
  file_names['eval'] = ['test_batch']
  return file_names

In [ ]:
def _read_pickle_from_file(filename):
  with tf.gfile.Open(filename, 'r') as f:
    data_dict = cPickle.load(f)
  return data_dict

In [ ]:
def _convert_to_tfrecord(input_files, output_file):
  """Converts a file to TFRecords."""
  print('Generating %s' % output_file)
  with tf.python_io.TFRecordWriter(output_file) as record_writer:
    for input_file in input_files:
      data_dict = _read_pickle_from_file(input_file)
      data = data_dict['data']
      labels =  data_dict['labels']
      num_entries_in_batch = len(labels)
      for i in range(num_entries_in_batch):
        example = tf.train.Example(features=tf.train.Features(
          feature={
            'image': _bytes_feature(data[i].tobytes()),
            'label': _int64_feature(labels[i])
          }))
        record_writer.write(example.SerializeToString())

In [ ]:
def _int64_feature(value):
  return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))

In [ ]:
def _bytes_feature(value):
  return tf.train.Feature(bytes_list=tf.train.BytesList(value=[str(value)]))

In [ ]:
def create_tfrecords_files(data_dir='cifar-10'):
  _download_and_extract(data_dir)
  file_names = _get_file_names()
  input_dir = os.path.join(data_dir, CIFAR_LOCAL_FOLDER)

  for mode, files in file_names.items():
    input_files = [os.path.join(input_dir, f) for f in files]
    output_file = os.path.join(data_dir, mode+'.tfrecords')
    try:
      os.remove(output_file)
    except OSError:
      pass
    # Convert to tf.train.Example and write to TFRecords.
    _convert_to_tfrecord(input_files, output_file)

In [ ]:
create_tfrecords_files()

2. Define parameters


In [ ]:
class FLAGS():
  pass

FLAGS.batch_size = 200
FLAGS.max_steps = 1000
FLAGS.eval_steps = 100
FLAGS.save_checkpoints_steps = 100
FLAGS.tf_random_seed = 19851211
FLAGS.model_name = 'cnn-model-02'
FLAGS.use_checkpoint = False

In [ ]:
IMAGE_HEIGHT = 32
IMAGE_WIDTH = 32
IMAGE_DEPTH = 3
NUM_CLASSES = 10

3. Define data input pipeline


In [ ]:
def parse_record(serialized_example):
  features = tf.parse_single_example(
    serialized_example,
    features={
      'image': tf.FixedLenFeature([], tf.string),
      'label': tf.FixedLenFeature([], tf.int64),
    })
  
  image = tf.decode_raw(features['image'], tf.uint8)
  image.set_shape([IMAGE_DEPTH * IMAGE_HEIGHT * IMAGE_WIDTH])
  image = tf.reshape(image, [IMAGE_DEPTH, IMAGE_HEIGHT, IMAGE_WIDTH])
  image = tf.cast(tf.transpose(image, [1, 2, 0]), tf.float32)
  
  label = tf.cast(features['label'], tf.int32)
  label = tf.one_hot(label, NUM_CLASSES)

  return image, label

In [ ]:
def preprocess_image(image, is_training=False):
  """Preprocess a single image of layout [height, width, depth]."""
  if is_training:
    # Resize the image to add four extra pixels on each side.
    image = tf.image.resize_image_with_crop_or_pad(
        image, IMAGE_HEIGHT + 8, IMAGE_WIDTH + 8)

    # Randomly crop a [_HEIGHT, _WIDTH] section of the image.
    image = tf.random_crop(image, [IMAGE_HEIGHT, IMAGE_WIDTH, IMAGE_DEPTH])

    # Randomly flip the image horizontally.
    image = tf.image.random_flip_left_right(image)

  # Subtract off the mean and divide by the variance of the pixels.
  image = tf.image.per_image_standardization(image)
  return image

In [ ]:
def generate_input_fn(file_names, mode=tf.estimator.ModeKeys.EVAL, batch_size=1):
  def _input_fn():
    dataset = tf.data.TFRecordDataset(filenames=file_names)

    is_training = (mode == tf.estimator.ModeKeys.TRAIN)
    if is_training:
      buffer_size = batch_size * 2 + 1
      dataset = dataset.shuffle(buffer_size=buffer_size)

    # Transformation
    dataset = dataset.map(parse_record)
    dataset = dataset.map(
      lambda image, label: (preprocess_image(image, is_training), label))

    dataset = dataset.repeat()
    dataset = dataset.batch(batch_size)
    dataset = dataset.prefetch(2 * batch_size)

    images, labels = dataset.make_one_shot_iterator().get_next()

    features = {'images': images}
    return features, labels
  
  return _input_fn

4. Define features


In [ ]:
def get_feature_columns():
  feature_columns = {
    'images': tf.feature_column.numeric_column('images', (IMAGE_HEIGHT, IMAGE_WIDTH, IMAGE_DEPTH)),
  }
  return feature_columns

In [ ]:
feature_columns = get_feature_columns()
print("Feature Columns: {}".format(feature_columns))

5. Define a model


In [ ]:
def inference(images):
  # 1st Convolutional Layer                                                                                                                 
  conv1 = tf.layers.conv2d(
      inputs=images, filters=64, kernel_size=[5, 5], padding='same',
      activation=tf.nn.relu, name='conv1')
  pool1 = tf.layers.max_pooling2d(
      inputs=conv1, pool_size=[3, 3], strides=2, name='pool1')
  norm1 = tf.nn.lrn(
      pool1, 4, bias=1.0, alpha=0.001 / 9.0, beta=0.75, name='norm1')

  # 2nd Convolutional Layer                                                                                                                 
  conv2 = tf.layers.conv2d(
      inputs=norm1, filters=64, kernel_size=[5, 5], padding='same',
      activation=tf.nn.relu, name='conv2')
  norm2 = tf.nn.lrn(
      conv2, 4, bias=1.0, alpha=0.001 / 9.0, beta=0.75, name='norm2')
  pool2 = tf.layers.max_pooling2d(
      inputs=norm2, pool_size=[3, 3], strides=2, name='pool2')

  # Flatten Layer                                                                                                                           
  shape = pool2.get_shape()
  pool2_ = tf.reshape(pool2, [-1, shape[1]*shape[2]*shape[3]])

  # 1st Fully Connected Layer                                                                                                               
  dense1 = tf.layers.dense(
      inputs=pool2_, units=384, activation=tf.nn.relu, name='dense1')

  # 2nd Fully Connected Layer                                                                                                               
  dense2 = tf.layers.dense(
      inputs=dense1, units=192, activation=tf.nn.relu, name='dense2')

  # 3rd Fully Connected Layer (Logits)                                                                                                      
  logits = tf.layers.dense(
      inputs=dense2, units=NUM_CLASSES, activation=tf.nn.relu, name='logits')

  return logits

In [ ]:
def model_fn(features, labels, mode, params):
  # Create the input layers from the features                                                                                               
  feature_columns = list(get_feature_columns().values())

  images = tf.feature_column.input_layer(
    features=features, feature_columns=feature_columns)

  images = tf.reshape(
    images, shape=(-1, IMAGE_HEIGHT, IMAGE_WIDTH, IMAGE_DEPTH))

  # Calculate logits through CNN                                                                                                            
  logits = inference(images)

  if mode in (tf.estimator.ModeKeys.PREDICT, tf.estimator.ModeKeys.EVAL):
    predicted_indices = tf.argmax(input=logits, axis=1)
    probabilities = tf.nn.softmax(logits, name='softmax_tensor')

  if mode in (tf.estimator.ModeKeys.TRAIN, tf.estimator.ModeKeys.EVAL):
    global_step = tf.train.get_or_create_global_step()
    label_indices = tf.argmax(input=labels, axis=1)
    loss = tf.losses.softmax_cross_entropy(
        onehot_labels=labels, logits=logits)
    tf.summary.scalar('cross_entropy', loss)

  if mode == tf.estimator.ModeKeys.PREDICT:
    predictions = {
        'classes': predicted_indices,
        'probabilities': probabilities
    }
    export_outputs = {
        'predictions': tf.estimator.export.PredictOutput(predictions)
    }
    return tf.estimator.EstimatorSpec(
        mode, predictions=predictions, export_outputs=export_outputs)

  if mode == tf.estimator.ModeKeys.TRAIN:
    optimizer = tf.train.AdamOptimizer(learning_rate=0.001)
    train_op = optimizer.minimize(loss, global_step=global_step)
    return tf.estimator.EstimatorSpec(
        mode, loss=loss, train_op=train_op)

  if mode == tf.estimator.ModeKeys.EVAL:
    eval_metric_ops = {
        'accuracy': tf.metrics.accuracy(label_indices, predicted_indices)
    }
    return tf.estimator.EstimatorSpec(
        mode, loss=loss, eval_metric_ops=eval_metric_ops)

6. Define a serving function


In [ ]:
def serving_input_fn():
  receiver_tensor = {'images': tf.placeholder(
    shape=[None, IMAGE_HEIGHT, IMAGE_WIDTH, IMAGE_DEPTH], dtype=tf.float32)}
  features = {'images': tf.map_fn(preprocess_image, receiver_tensor['images'])}
  return tf.estimator.export.ServingInputReceiver(features, receiver_tensor)

7. Train, evaluate and export a model


In [ ]:
model_dir = 'trained_models/{}'.format(FLAGS.model_name)
train_data_files = ['cifar-10/train.tfrecords']
valid_data_files = ['cifar-10/validation.tfrecords']
test_data_files = ['cifar-10/eval.tfrecords']

In [ ]:
run_config = tf.estimator.RunConfig(
  save_checkpoints_steps=FLAGS.save_checkpoints_steps,
  tf_random_seed=FLAGS.tf_random_seed,
  model_dir=model_dir
)

estimator = tf.estimator.Estimator(model_fn=model_fn, config=run_config)

# There is another Exporter named FinalExporter
exporter = tf.estimator.LatestExporter(
  name='Servo',
  serving_input_receiver_fn=serving_input_fn,
  assets_extra=None,
  as_text=False,
  exports_to_keep=5)

train_spec = tf.estimator.TrainSpec(
  input_fn=generate_input_fn(file_names=train_data_files,
                             mode=tf.estimator.ModeKeys.TRAIN,
                             batch_size=FLAGS.batch_size),
  max_steps=FLAGS.max_steps)

eval_spec = tf.estimator.EvalSpec(
  input_fn=generate_input_fn(file_names=valid_data_files,
                             mode=tf.estimator.ModeKeys.EVAL,
                             batch_size=FLAGS.batch_size),
  steps=FLAGS.eval_steps, exporters=exporter)

In [ ]:
if not FLAGS.use_checkpoint:
  print("Removing previous artifacts...")
  shutil.rmtree(model_dir, ignore_errors=True)

tf.estimator.train_and_evaluate(estimator, train_spec, eval_spec)

8. Evaluation with Estimator


In [ ]:
test_input_fn = generate_input_fn(file_names=test_data_files,
                                  mode=tf.estimator.ModeKeys.EVAL,
                                  batch_size=1000)
estimator = tf.estimator.Estimator(model_fn=model_fn, config=run_config)
print(estimator.evaluate(input_fn=test_input_fn, steps=1))

9. Prediction with Exported Model


In [ ]:
export_dir = model_dir + '/export/Servo/'
saved_model_dir = os.path.join(export_dir, os.listdir(export_dir)[-1]) 

predictor_fn = tf.contrib.predictor.from_saved_model(
  export_dir = saved_model_dir,
  signature_def_key='predictions')

In [ ]:
import numpy

data_dict = _read_pickle_from_file('cifar-10/cifar-10-batches-py/test_batch')

N = 1000
images = data_dict['data'][:N].reshape([N, 3, 32, 32]).transpose([0, 2, 3, 1])
labels = data_dict['labels'][:N]

output = predictor_fn({'images': images})
accuracy = numpy.sum(
  [ans==ret for ans, ret in zip(labels, output['classes'])]) / float(N)

print(accuracy)

10. Distributed Training with Cloud ML Engine

a. Set environments


In [ ]:
import os

PROJECT = 'YOUR-PROJECT-ID' # REPLACE WITH YOUR PROJECT ID
BUCKET = 'YOUR-BUCKET-NAME' # REPLACE WITH YOUR BUCKET NAME
REGION = 'BUCKET-REGION' # REPLACE WITH YOUR BUCKET REGION e.g. us-central1

os.environ['PROJECT'] = PROJECT
os.environ['BUCKET'] = BUCKET
os.environ['REGION'] = REGION

In [ ]:
%%bash
gcloud config set project $PROJECT
gcloud config set compute/region $REGION

b. Set permission to BUCKET (NOTE: Create bucket beforehand)


In [ ]:
%%bash

PROJECT_ID=$PROJECT
AUTH_TOKEN=$(gcloud auth print-access-token)

SVC_ACCOUNT=$(curl -X GET -H "Content-Type: application/json" \
    -H "Authorization: Bearer $AUTH_TOKEN" \
    https://ml.googleapis.com/v1/projects/${PROJECT_ID}:getConfig \
    | python -c "import json; import sys; response = json.load(sys.stdin); \
    print response['serviceAccount']")

echo "Authorizing the Cloud ML Service account $SVC_ACCOUNT to access files in $BUCKET"
gsutil -m defacl ch -u $SVC_ACCOUNT:R gs://$BUCKET
gsutil -m acl ch -u $SVC_ACCOUNT:R -r gs://$BUCKET  # error message (if bucket is empty) can be ignored
gsutil -m acl ch -u $SVC_ACCOUNT:W gs://$BUCKET

c. Copy TFRecords files to GCS BUCKET


In [ ]:
%%bash

echo ${BUCKET}
gsutil -m rm -rf gs://${BUCKET}/cifar-10
gsutil -m cp cifar-10/*.tfrecords gs://${BUCKET}/cifar-10

d. Run distributed training with Cloud MLE


In [ ]:
%%bash
OUTDIR=gs://$BUCKET/trained_models_3cpu
JOBNAME=sm_$(date -u +%y%m%d_%H%M%S)
echo $OUTDIR $REGION $JOBNAME

gsutil -m rm -rf $OUTDIR
gcloud ml-engine jobs submit training $JOBNAME \
   --region=$REGION \
   --module-name=cnn-model-02.task \
   --package-path="$(pwd)/trainer/cnn-model-02" \
   --job-dir=$OUTDIR \
   --staging-bucket=gs://$BUCKET \
   --config=config_3cpu.yaml \
   --runtime-version=1.4 \
   -- \
   --bucket_name=$BUCKET \
   --train_data_pattern=cifar-10/train*.tfrecords \
   --eval_data_pattern=cifar-10/eval*.tfrecords  \
   --output_dir=$OUTDIR \
   --max_steps=10000

In [ ]:
%%bash
OUTDIR=gs://$BUCKET/trained_models_3gpu
JOBNAME=sm_$(date -u +%y%m%d_%H%M%S)
echo $OUTDIR $REGION $JOBNAME

gsutil -m rm -rf $OUTDIR
gcloud ml-engine jobs submit training $JOBNAME \
   --region=$REGION \
   --module-name=cnn-model-02.task \
   --package-path="$(pwd)/trainer/cnn-model-02" \
   --job-dir=$OUTDIR \
   --staging-bucket=gs://$BUCKET \
   --config=config_3gpu.yaml \
   --runtime-version=1.4 \
   -- \
   --bucket_name=$BUCKET \
   --train_data_pattern=cifar-10/train*.tfrecords \
   --eval_data_pattern=cifar-10/eval*.tfrecords  \
   --output_dir=$OUTDIR \
   --max_steps=10000